{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark with MLRun example"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This example notebook demonstrates how to execute a spark job with MLRun.\n",
    "\n",
    "Our spark job is a generic ETL job which pulls data from user-defined data sources, applies a SQL query on top of them, and writes the result to a user defined destination.\n",
    "\n",
    "The definition of the input-sources should be according to : https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader\n",
    "\n",
    "The definition of the output destination should be according to :\n",
    "https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from os.path import isfile, join\n",
    "from mlrun import new_function, new_task, mlconf\n",
    "\n",
    "# Set the mlrun database/api\n",
    "mlconf.dbpath = \"http://mlrun-api:8080\"\n",
    "\n",
    "# Set the pyspark script path\n",
    "V3IO_WORKING_DIR = os.getcwd().replace(\"/User\", \"/v3io/\" + os.getenv(\"V3IO_HOME\"))\n",
    "V3IO_SCRIPT_PATH = V3IO_WORKING_DIR + \"/spark-function.py\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define a task (job parameters)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Define a dict of input data sources\n",
    "DATA_SOURCES = {\n",
    "    \"family\": {\n",
    "        \"format\": \"jdbc\",\n",
    "        \"url\": \"jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam\",\n",
    "        \"dbtable\": \"Rfam.family\",\n",
    "        \"user\": \"rfamro\",\n",
    "        \"password\": \"\",\n",
    "        \"driver\": \"com.mysql.jdbc.Driver\",\n",
    "    },\n",
    "    \"full_region\": {\n",
    "        \"format\": \"jdbc\",\n",
    "        \"url\": \"jdbc:mysql://mysql-rfam-public.ebi.ac.uk:4497/Rfam\",\n",
    "        \"dbtable\": \"Rfam.full_region\",\n",
    "        \"user\": \"rfamro\",\n",
    "        \"password\": \"\",\n",
    "        \"driver\": \"com.mysql.jdbc.Driver\",\n",
    "    },\n",
    "}\n",
    "\n",
    "# Define a query to execute on the input data sources\n",
    "QUERY = \"SELECT family.*, full_region.evalue_score from family INNER JOIN full_region ON family.rfam_acc = full_region.rfam_acc  LIMIT 10\"\n",
    "\n",
    "# Define the output destination\n",
    "WRITE_OPTIONS = {\n",
    "    \"format\": \"io.iguaz.v3io.spark.sql.kv\",\n",
    "    \"mode\": \"overwrite\",\n",
    "    \"key\": \"rfam_id\",\n",
    "    \"path\": \"v3io://users/admin/frommysql\",\n",
    "}\n",
    "\n",
    "# Create a task execution with parameters\n",
    "PARAMS = {\"data_sources\": DATA_SOURCES, \"query\": QUERY, \"write_options\": WRITE_OPTIONS}\n",
    "\n",
    "SPARK_TASK = new_task(params=PARAMS)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Download mysql driver"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run locally (in the notebook or attched Spark service)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Get the list of the dpendency jars\n",
    "V3IO_JARS_PATH = \"/igz/java/libs/\"\n",
    "DEPS_JARS_LIST = [\n",
    "    join(V3IO_JARS_PATH, f)\n",
    "    for f in os.listdir(V3IO_JARS_PATH)\n",
    "    if isfile(join(V3IO_JARS_PATH, f)) and f.startswith(\"v3io-\") and f.endswith(\".jar\")\n",
    "]\n",
    "\n",
    "DEPS_JARS_LIST.append(V3IO_WORKING_DIR + \"/mysql-connector-java-8.0.19.jar\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create MLRun function which runs locally in a passthrough mode (since we use spark-submit)\n",
    "local_spark_fn = new_function(\n",
    "    kind=\"local\",\n",
    "    mode=\"pass\",\n",
    "    command=f\"spark-submit --jars {','.join(DEPS_JARS_LIST)} {V3IO_SCRIPT_PATH}\",\n",
    ")\n",
    "\n",
    "# Run the function with a task\n",
    "local_spark_fn.run(SPARK_TASK)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Run as a job on the Kubernetes cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create MLRun function to run the spark-job on the kubernetes cluster\n",
    "serverless_spark_fn = new_function(\n",
    "    kind=\"spark\", command=V3IO_SCRIPT_PATH, name=\"my-spark-func\"\n",
    ")\n",
    "\n",
    "serverless_spark_fn.with_driver_limits(cpu=\"1300m\")\n",
    "serverless_spark_fn.with_driver_requests(\n",
    "    cpu=1, mem=\"4G\"\n",
    ")  # gpu_type & gpus=<number_of_gpus> are supported too\n",
    "serverless_spark_fn.with_executor_limits(cpu=\"1400m\")\n",
    "serverless_spark_fn.with_executor_requests(\n",
    "    cpu=1, mem=\"4G\"\n",
    ")  # gpu_type & gpus=<number_of_gpus> are supported too\n",
    "\n",
    "serverless_spark_fn.with_igz_spark()\n",
    "\n",
    "# Download mysql driver\n",
    "serverless_spark_fn.spec.build.commands += [\n",
    "    \"wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar \"\n",
    "    \"-O /spark/jars/mysql-connector-java-8.0.19.jar\"\n",
    "]\n",
    "\n",
    "# Set number of executors\n",
    "serverless_spark_fn.spec.replicas = 2\n",
    "\n",
    "# Deploy function and install MLRun in the spark image\n",
    "serverless_spark_fn.deploy()\n",
    "\n",
    "run = serverless_spark_fn.run(SPARK_TASK, watch=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# Get the spark job's UI URL:\n",
    "run.ui_url"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "mlrun-base",
   "language": "python",
   "name": "conda-env-mlrun-base-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.9.16"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
